/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.core.consumer.cluster;

import com.alibaba.fastjson.JSON;
import com.lvyh.lightframe.common.util.CommonUtils;
import com.lvyh.lightframe.common.constant.RpcConstants;
import com.lvyh.lightframe.common.RpcContext;
import com.lvyh.lightframe.core.provider.ProviderGroup;
import com.lvyh.lightframe.core.provider.ProviderHelper;
import com.lvyh.lightframe.core.provider.ProviderInfo;
import com.lvyh.lightframe.core.consumer.route.LoadBalance;
import com.lvyh.lightframe.core.consumer.transport.AbstractClient;
import com.lvyh.lightframe.common.ext.ExtensionLoaderFactory;
import com.lvyh.lightframe.core.invoke.response.ResponseCallback;
import com.lvyh.lightframe.core.invoke.request.RpcRequest;
import com.lvyh.lightframe.core.invoke.response.RpcResponseFuture;
import com.lvyh.lightframe.core.invoke.response.RpcResponse;
import com.lvyh.lightframe.common.exception.RpcRuntimeException;
import com.lvyh.lightframe.registry.Registry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author lvyh 2021/05/16.
 */
public abstract class AbstractCluster extends Cluster {
    private Logger logger = LoggerFactory.getLogger(AbstractCluster.class);
    protected LoadBalance loadBalance;

    public AbstractCluster(LoadBalance loadBalance) {
        this.loadBalance = loadBalance;
    }

    /**
     * The number of calls currently being sent by the client
     */
    protected AtomicInteger countOfInvoke = new AtomicInteger(0);

    @Override
    public RpcResponse invoke(RpcRequest request) throws RpcRuntimeException {
        RpcResponse response = null;
        try {
            countOfInvoke.incrementAndGet(); // count+1
            response = doInvoke(request);
            return response;
        } catch (RpcRuntimeException e) {
            throw e;
        } finally {
            countOfInvoke.decrementAndGet(); // count-1
        }
    }

    protected abstract RpcResponse doInvoke(RpcRequest msg) throws RpcRuntimeException;

    /**
     * Load balancing according to rules
     */
    protected ProviderInfo select(RpcRequest rpcRequest, LoadBalance loadBalance) throws RpcRuntimeException {
        List<ProviderInfo> providerInfoList = getProviderInfoList(rpcRequest);
        ProviderInfo providerInfo = loadBalance.select(rpcRequest, providerInfoList);
        return providerInfo;
    }

    /**
     * Load balancing according to rules
     */
    protected List<ProviderInfo> select(RpcRequest rpcRequest) throws RpcRuntimeException {
        return getProviderInfoList(rpcRequest);
    }

    private List<ProviderInfo> getProviderInfoList(RpcRequest rpcRequest) {
        Registry registry = ExtensionLoaderFactory.getExtensionLoader(Registry.class).
                getExtension(RpcContext.getRegistry(), new Class[]{String.class}, new Object[]{RpcContext.getZkAddress()});
        String serviceKey = CommonUtils.buildServiceUniqueCode(rpcRequest.getClassName(), rpcRequest.getVersion());
        List<String> addressList = registry.discover(serviceKey);
        List<ProviderInfo> providerInfoList = ProviderHelper.convertToProviderList(addressList);
        return providerInfoList;
    }


    @Override
    public void addProvider(ProviderGroup providerGroup) {

    }

    @Override
    public void removeProvider(ProviderGroup providerGroup) {

    }

    @Override
    public void updateProviders(ProviderGroup providerGroup) {

    }

    @Override
    public void updateAllProviders(List<ProviderGroup> providerGroups) {

    }

    @Override
    public boolean isAvailable() {
        return true;
    }

    protected RpcResponse doInvoke(AbstractClient client, RpcRequest request) throws Exception {
        RpcResponse response = new RpcResponse();
        Object object = executeInvoke(client, request);
        response.setResult(object);
        response.setRequestId(request.getRequestId());
        return response;
    }

    protected Object executeInvoke(AbstractClient client, RpcRequest request) throws Exception {
        String invokeType = request.getInvokeType();
        //request.setRequestId(CommonIdUtil.generateSimpleUUID());
        logger.info("[AbstractCluster] print invoke request id:{}", request.getRequestId());

        // Sync invoke
        if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
            RpcResponseFuture responseFuture = new RpcResponseFuture(request);
            try {
                client.send(request);
                logger.info("[AbstractCluster] consumer start to invoke romote service, request:{}", JSON.toJSONString(request));

                RpcResponse rpcResponse = responseFuture.get(request.getTimeout(), TimeUnit.MILLISECONDS);
                logger.info("[AbstractCluster] consumer get response future, request:{}", JSON.toJSONString(rpcResponse));
                if (rpcResponse.isError()) {
                    throw new RpcRuntimeException(rpcResponse.getErrorMsg());
                }
                return rpcResponse.getResult();
            } catch (Exception e) {
                logger.info("[AbstractCluster] invoke romote service error, serverAddress:{}, request{}", request.getServerAddress(), request);
                throw new RpcRuntimeException("invoke error,", e);
            } finally {
                responseFuture.removeResponseFuture();
            }
        }
        // Future invoke
        else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
            RpcResponseFuture responseFuture = new RpcResponseFuture(request);
            try {
                client.send(request);

                /*Object rpcResponse = responseFuture.get();
                logger.info("rpcResponse:{}", rpcResponse.toString());*/
                return null;
            } catch (Exception e) {
                logger.info("invoke error, serverAddress:{}, request{}", request.getServerAddress(), request);
                responseFuture.removeResponseFuture();
                throw new RpcRuntimeException("invoke error,", e);
            }
        }
        // Callback invoke
        else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {

            ResponseCallback responseCallback = new ResponseCallback<Object>() {
                @Override
                public void onSuccess(Object result) {
                    logger.info("responseCallback, result:{}", result.toString());
                }

                @Override
                public void onException(Throwable e) {
                    throw new RpcRuntimeException("invoke error,", e);
                }
            };
            RpcResponseFuture futureResponse = new RpcResponseFuture(request, responseCallback);
            try {
                client.send(request);
            } catch (Exception e) {
                logger.info("rpc invoke error, serverAddress:{}, request{}", request.getServerAddress(), request);
                futureResponse.removeResponseFuture();
                throw new RpcRuntimeException("invoke error,", e);
            }
        }
        // oneway invoke
        else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
            client.send(request);
        }

        return null;
    }
}
